Production Python — FastAPI, Async & Agentic AI
Lessons learned for developers coming from Spring/Java
1. Project Structure
Python doesn't have Maven conventions. Establish structure early:
myproject/
├── .env # local dev only — NEVER commit, NEVER in Docker image
├── .gitignore
├── Dockerfile
├── requirements.txt # like pom.xml dependencies
├── charts/
│ └── myproject/
│ ├── Chart.yaml
│ ├── values.yaml
│ └── templates/
│ ├── deployment.yaml
│ ├── service.yaml
│ ├── ingress.yaml
│ ├── configmap.yaml
│ ├── secret.yaml # references Key Vault via CSI driver
│ └── hpa.yaml
├── src/
│ ├── main.py # entrypoint (like @SpringBootApplication)
│ ├── config/
│ │ ├── config.py # centralized config (like application.yml)
│ │ ├── exception_handlers.py
│ │ └── tracing.py
│ ├── controllers/
│ │ ├── chat_controller.py # route handlers (like @RestController)
│ │ ├── job_controller.py
│ │ └── health_controller.py
│ ├── models/
│ │ ├── user_request.py # Pydantic models (like DTOs/records)
│ │ └── job_response.py
│ ├── service/
│ │ ├── agents.py # agent definitions
│ │ └── skills/
│ │ └── skills.py # tool functions (like @Service methods)
│ ├── db/
│ │ ├── client.py # async DB client setup & connection
│ │ ├── repositories.py # data access (like @Repository)
│ │ └── migrations/
│ ├── tasks/
│ │ └── celery_tasks.py # Celery background tasks
│ └── samples/
│ └── data.yaml
Key conventions:
controllers/— route definitions only, no business logic (like@RestController)service/— business logic, agent orchestration (like@Service)db/— all database concerns: connection setup, queries, migrationscharts/— Helm charts for AKS deployment, lives at project root alongside Dockerfilemodels/— Pydantic request/response models (like DTOs/records)config/— settings, exception handlers, tracing — anything cross-cutting
2. FastAPI Basics (Spring MVC Equivalent)
# controllers/chat_controller.py
from fastapi import APIRouter, Request
from models.user_request import UserRequest
router = APIRouter(prefix="/chat", tags=["chat"])
# GET with path variable — like @GetMapping("/chat/{input}")
@router.get("/{user_input}")
async def chat(user_input: str):
return {"response": user_input}
# POST with request body — like @PostMapping with @RequestBody
@router.post("/process")
async def process(request: UserRequest): # auto-validated by Pydantic
return {"status": "ok"}
# main.py
from fastapi import FastAPI
from controllers.chat_controller import router as chat_router
from controllers.job_controller import router as job_router
from config.exception_handlers import register_exception_handlers
app = FastAPI(lifespan=lifespan)
app.include_router(chat_router)
app.include_router(job_router)
register_exception_handlers(app)
Run with: uvicorn src.main:app --host 127.0.0.1 --port 8000 --reload
3. Configuration & Secrets
Local dev — .env file
# config/config.py
from pydantic_settings import BaseSettings
from pathlib import Path
class Settings(BaseSettings):
# Required — app won't start without these (fail-fast like @Value)
openai_api_key: str
openai_url: str
# Optional with defaults
azure_deployment: str = "gpt-4o"
max_concurrent_calls: int = 5
model_config = {
"env_file": ".env",
"case_sensitive": False,
}
settings = Settings() # validates on import — fails fast if missing required fields
Key gotcha: .env path must be resolved relative to the config file, not the working directory.
Production — secrets come from environment variables, injected by Key Vault
Pydantic BaseSettings reads environment variables by default. In production there is no .env file — secrets are injected as env vars by the platform:
# charts/myproject/templates/deployment.yaml
spec:
containers:
- name: api
env:
# Non-sensitive config — from ConfigMap
- name: AZURE_DEPLOYMENT
valueFrom:
configMapKeyRef:
name: myproject-config
key: azure-deployment
# Secrets — from Azure Key Vault via CSI driver
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: myproject-secrets # populated by SecretProviderClass
key: openai-api-key
volumes:
- name: secrets-store
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: myproject-keyvault
The flow:
Local dev: .env file → Pydantic BaseSettings → settings.openai_api_key
Production: Key Vault → CSI Driver → K8s Secret → env var → Pydantic BaseSettings → settings.openai_api_key
Your application code stays the same — settings.openai_api_key works in both environments. Only the source of the value changes.
4. Pydantic Models (Java Records/DTOs)
# models/user_request.py
from pydantic import BaseModel
class UserRequest(BaseModel):
user_id: str
request_type: str
request_details: str
# FastAPI auto-validates incoming JSON against this:
@router.post("/process")
async def process(request: UserRequest): # 422 if validation fails
return {"user": request.user_id}
5. Exception Handlers (@ControllerAdvice Equivalent)
# config/exception_handlers.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
def register_exception_handlers(app: FastAPI):
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(status_code=400, content={"error": str(exc)})
@app.exception_handler(KeyError)
async def key_error_handler(request: Request, exc: KeyError):
return JSONResponse(status_code=404, content={"error": f"Not found: {exc}"})
@app.exception_handler(Exception)
async def general_handler(request: Request, exc: Exception):
return JSONResponse(status_code=500, content={"error": "Internal server error"})
6. Async & Blocking Code
The problem — sync calls inside async functions block the entire event loop:
# BAD — blocks event loop, no other requests handled while waiting
@router.get("/data")
async def get_data():
response = requests.get("https://api.example.com/data") # blocks!
return response.json()
The fix — use httpx.AsyncClient:
# GOOD — non-blocking, event loop stays free
@router.get("/data")
async def get_data(request: Request):
client = request.app.state.http_client
response = await client.get("https://api.example.com/data")
return response.json()
How Async Actually Works
- One thread, switches between coroutines at every
awaitpoint - Concurrent (multiple things in progress) but not parallel (not truly simultaneous on CPU)
- Perfect for I/O-bound work — waiting for APIs, DBs, file reads
| Work type | Solution |
|---|---|
| Async I/O (APIs, DB) | await directly |
| Sync/blocking library | run_in_executor with ThreadPool |
| Pure CPU (number crunching) | run_in_executor with ProcessPool |
| Long background job (minutes) | Celery |
For CPU-heavy work, offload to a thread:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
@router.get("/process")
async def process():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, cpu_heavy_function, data)
return result
7. Async Concurrency (CompletableFuture.allOf Equivalent)
# Sequential: 2s + 2s = 4s total
result1 = await client.get(url1)
result2 = await client.get(url2)
# Concurrent: max(2s, 2s) = 2s total
result1, result2 = await asyncio.gather(
client.get(url1),
client.get(url2)
)
With throttling via semaphore:
import asyncio, time
semaphore = asyncio.Semaphore(5) # throttle to 5 concurrent calls
async def timed_call(name: str, coro):
async with semaphore:
start = time.time()
result = await coro
elapsed = time.time() - start
return {"name": name, "result": result, "elapsed_seconds": round(elapsed, 2)}
@router.get("/concurrent")
async def concurrent_analysis():
queries = ["query 1", "query 2", "query 3"]
tasks = [
timed_call(q, agent.ainvoke({"messages": [HumanMessage(content=q)]}))
for q in queries
]
results = await asyncio.gather(*tasks)
return {"results": results}
| Java | Python |
|---|---|
CompletableFuture.supplyAsync() |
asyncio.create_task() |
CompletableFuture.allOf().join() |
await asyncio.gather(*tasks) |
Semaphore(5) |
asyncio.Semaphore(5) |
8. Managing Resources with Lifespan
Everything that holds open connections belongs in lifespan — created once per pod, cleaned up on shutdown:
# main.py
from contextlib import asynccontextmanager
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- STARTUP ---
app.state.http_client = httpx.AsyncClient(timeout=30)
app.state.db = AsyncDBClient(DB_URL)
app.state.redis = await aioredis.from_url(REDIS_URL)
tracer_provider = setup_tracing()
yield # app is running and serving requests
# --- SHUTDOWN (reverse order) ---
tracer_provider.shutdown()
await app.state.redis.close()
await app.state.db.close()
await app.state.http_client.aclose()
app = FastAPI(lifespan=lifespan)
Access in routes via request.app.state:
@router.get("/items")
async def get_items(request: Request):
client = request.app.state.http_client
response = await client.get("https://api.example.com/items")
return response.json()
9. Celery for Background Tasks
Use when work doesn't need to complete within the HTTP request:
User → POST /jobs → FastAPI → "task_id: abc123" (immediate response)
↓
Redis/RabbitMQ (broker)
↓
Celery Worker Pod → does heavy work → stores result
User → GET /jobs/abc123 → "status: done, result: ..."
# tasks/celery_tasks.py (runs in separate worker pod)
from celery import Celery
celery_app = Celery("tasks", broker="redis://redis:6379/0", backend="redis://redis:6379/1")
@celery_app.task(bind=True, max_retries=3)
def process_large_job(self, data: dict):
try:
result = do_heavy_work(data)
return result
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
# controllers/job_controller.py
from fastapi import APIRouter
from tasks.celery_tasks import process_large_job, celery_app
router = APIRouter(prefix="/jobs", tags=["jobs"])
@router.post("/")
async def create_job(data: dict):
task = process_large_job.delay(data)
return {"task_id": task.id}
@router.get("/{task_id}")
async def job_status(task_id: str):
result = celery_app.AsyncResult(task_id)
return {"status": result.status, "result": result.result if result.ready() else None}
10. LangChain Agents — Agents Are NOT Tools
A LangChain/LangGraph agent (create_agent) returns a CompiledStateGraph, not a callable tool. You cannot pass agents as tools to other agents directly.
Wrong:
orchestrator = create_agent(model=llm, tools=[sub_agent]) # FAILS
Right — wrap in @tool:
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
sub_agent = create_agent(model=llm, name="SubAgent", tools=[some_tool])
@tool("delegate_to_sub_agent")
async def delegate_to_sub_agent(query: str) -> str:
"""Delegates work to the sub-agent."""
response = await sub_agent.ainvoke({"messages": [HumanMessage(content=query)]})
return response["messages"][-1].content
orchestrator = create_agent(model=llm, tools=[delegate_to_sub_agent]) # WORKS
11. LLM Fallback + Retry (@Retryable + @Recover Equivalent)
from langchain_openai import AzureChatOpenAI
primary = AzureChatOpenAI(azure_endpoint="...", api_key="...", azure_deployment="gpt-4o")
fallback = AzureChatOpenAI(azure_endpoint="...", api_key="...", azure_deployment="gpt-3.5-turbo")
# Retry primary 2x, then route to fallback
llm = primary.with_retry(stop_after_attempt=2).with_fallbacks([fallback])
# Use like normal — retry/fallback is transparent
response = llm.invoke("Hello")
.with_fallbacks() catches any exception from the primary and routes to the next LLM in the list. .with_retry() retries the same LLM before giving up.
12. Tracing (Micrometer/Spring Observability Equivalent)
# config/tracing.py
import logging, time, uuid
from functools import wraps
logger = logging.getLogger(__name__)
def trace_llm_call(func):
@wraps(func)
async def wrapper(*args, **kwargs):
trace_id = str(uuid.uuid4())[:8]
logger.info(f"[{trace_id}] Starting: {func.__name__}")
start = time.time()
try:
result = await func(*args, **kwargs)
logger.info(f"[{trace_id}] Completed {func.__name__} in {time.time()-start:.2f}s")
return result
except Exception as e:
logger.error(f"[{trace_id}] Failed {func.__name__} after {time.time()-start:.2f}s: {e}")
raise
return wrapper
# Usage:
@tool("my_tool")
@trace_llm_call
async def my_tool(query: str) -> str:
"""Does something."""
...
For production, replace with LangSmith (LANGCHAIN_TRACING_V2=true) or OpenTelemetry.
13. Uvicorn, ASGI & Deployment
FastAPI is just an ASGI object — it needs a server to listen on a port and run the event loop.
On AKS — skip Gunicorn, let Kubernetes scale:
# charts/myproject/templates/deployment.yaml
spec:
replicas: 6
containers:
- name: api
command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Why skip Gunicorn on Kubernetes:
- AKS already handles pod restarts, rolling deploys, scaling
- Gunicorn adds a second layer of worker config that fights with Kubernetes resource limits
- One responsibility per layer is cleaner
For local dev only:
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
This block is ignored in production — uvicorn imports the module and grabs app directly.
14. Corporate Proxy / SSL
import httpx
# SSL interception means certs won't verify — disable for dev only
llm = AzureChatOpenAI(
...,
http_client=httpx.Client(verify=False),
http_async_client=httpx.AsyncClient(verify=False),
)
15. Docker
FROM python:3.11-slim
ARG PIP_INDEX_URL
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
WORKDIR /app/src
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# Build — pass your corporate Artifactory/JFrog URL for pip
docker build --build-arg PIP_INDEX_URL="https://user:token@corp-artifactory/pypi/simple" -t myapp .
# Run locally with .env for dev
docker run -p 8000:8000 --env-file .env myapp
Gotchas:
.envfiles with--env-filemust NOT have quotes around valuesPIP_INDEX_URLmust be anARG, notENV, so it's not baked into the image- Run
WORKDIRmust match wheremain.pylives so relative imports work - Never
COPY .env .into the image — secrets do not belong in Docker layers
16. The Full Picture
Internet
↓
AKS Ingress (load balancing across pods)
↓
Pod (uvicorn — owns the asyncio event loop)
↓ Key Vault → CSI Driver → env vars
FastAPI (routing, middleware, dependency injection)
↓
async route handler
├── await app.state.http_client.get(...) ← non-blocking I/O
├── await asyncio.gather(call1, call2) ← concurrent calls
├── await app.state.db.find(...) ← async DB
└── heavy_task.delay(data) ← fire-and-forget to Celery
Quick Reference Table
| Concept | Spring/Java | Python | |
|---|---|---|---|
| Web framework | Spring MVC | FastAPI | |
| Config | application.yml + @Value |
Pydantic BaseSettings + env vars |
|
| Secrets | Spring Cloud Vault | Key Vault → CSI Driver → env vars | |
| DI / Beans | @Bean, @Autowired |
Module-level instances (import) | |
| Exception handling | @ControllerAdvice |
@app.exception_handler() |
|
| Request validation | @Valid + Bean Validation |
Pydantic BaseModel (auto) |
|
| Async concurrency | CompletableFuture |
asyncio.gather() |
|
| Rate limiting | Semaphore |
asyncio.Semaphore |
|
| Retry + fallback | @Retryable + @Recover |
.with_retry().with_fallbacks() |
|
| Observability | Micrometer + Sleuth | Decorator + logging (or LangSmith or any OTEL) | |
| Build tool | Maven/Gradle | pip + requirements.txt | |
| Packaging | JAR | Docker + uvicorn |